package com.ld.shieldsb.canalclient.recoder;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

import com.ld.shieldsb.canalclient.handler.config.MappingConfig;
import com.ld.shieldsb.canalclient.handler.config.MappingConfig.DbMapping;
import com.ld.shieldsb.canalclient.model.Dml;
import com.ld.shieldsb.canalclient.model.RecordHandlerDmlModel;
import com.ld.shieldsb.canalclient.model.RecordModel;
import com.ld.shieldsb.canalclient.model.SingleDml;
import com.ld.shieldsb.canalclient.model.SyncResult;
import com.ld.shieldsb.common.core.queue.QueueEntry;

import lombok.Data;

/**
 * 
 * 记录者
 * 
 * @ClassName Recorder
 * @author <a href="mailto:donggongai@126.com" target="_blank">吕凯</a>
 * @date 2021年12月16日 上午11:03:09
 *
 */
@Data
public class Recorder {

    private BlockingQueue<QueueEntry<Dml>> dmlQueue; // 所有dm，
    private BlockingQueue<QueueEntry<RecordHandlerDmlModel>> validDmlQueue; // 有效Dml
    private BlockingQueue<QueueEntry<RecordModel>> syncQueue; // 同步数据

    private boolean useDmlQueue;
    private boolean useValidDmlQueue;
    private boolean userSyncQueue;

    public Recorder() {
        this(1000, 1000, 1000);
    }

    public Recorder(int dmlQueueSize, int validDmlSize, int syncQueueSize) {
        if (dmlQueueSize > 0) {
            dmlQueue = new LinkedBlockingQueue<>(dmlQueueSize);
            useDmlQueue = true;
        }
        if (validDmlSize > 0) {
            validDmlQueue = new LinkedBlockingQueue<>(validDmlSize);
            useValidDmlQueue = true;
        }
        if (syncQueueSize > 0) {
            syncQueue = new LinkedBlockingQueue<>(syncQueueSize);
            userSyncQueue = true;
        }
    }

    /**
     * 记录dml的数据条数，一般是1
     * 
     * @Title record
     * @author 吕凯
     * @date 2021年12月16日 上午11:34:31
     * @param dml
     *            void
     * @throws InterruptedException
     */
    public void recordDml(Dml dml) throws InterruptedException {
        if (dmlQueue != null) {
            dmlQueue.put(new QueueEntry<>(dml.getUuid(), dml)); // 如果队列满，则阻塞
        }
    }

    /**
     * 只记录某个处理器接收到的有效Dml
     * 
     * @Title recordValidDml
     * @author 吕凯
     * @date 2021年12月16日 下午4:08:57
     * @param dml
     * @throws InterruptedException
     *             void
     */
    public void recordValidDml(Dml dml, String handlerKey) throws InterruptedException {
        if (validDmlQueue != null) {
            validDmlQueue.put(new QueueEntry<>(dml.getUuid(), RecordHandlerDmlModel.builder().dml(dml).handlerKey(handlerKey).build())); // 如果队列满，则阻塞
        }
    }

    /**
     * 记录每条数据的同步情况
     * 
     * @Title record
     * @author 吕凯
     * @date 2021年12月16日 上午11:37:20
     * @param singleDml
     * @param handlerKey
     * @param handState
     *            处理状态0失败1成功2错误3忽略
     * @param msg
     *            void
     * @throws InterruptedException
     */
    public void recordSync(SingleDml singleDml, MappingConfig config, SyncResult result) throws InterruptedException {
        if (syncQueue != null) {
            String handlerKey = config.getOuterAdapterKey();
            DbMapping mapping = config.getDbMapping();
            syncQueue.put(new QueueEntry<>(singleDml.getDmlUuid(),
                    RecordModel.builder().singleDml(singleDml).handlerKey(handlerKey).handState(result.getState())
                            .targetDb(mapping.getTargetDb()).targetTable(mapping.getTargetTable()).msg(result.getMsg()).build()));
        }
    }

    /**
     * 获取dml数量记录
     * 
     * @Title takeDmlCountRecord
     * @author 吕凯
     * @date 2021年12月16日 下午12:01:09
     * @return
     * @throws InterruptedException
     *             QueueEntry<Integer>
     */
    public QueueEntry<Dml> takeDmlRecord() throws InterruptedException {
        if (dmlQueue != null) {
            return dmlQueue.take();
        }
        return null;
    }

    /**
     * 获取处理器的有效dml
     * 
     * @Title takeValidDmlRecord
     * @author 吕凯
     * @date 2021年12月16日 下午3:55:57
     * @return
     * @throws InterruptedException
     *             QueueEntry<Dml>
     */
    public QueueEntry<RecordHandlerDmlModel> takeValidDmlRecord() throws InterruptedException {
        if (validDmlQueue != null) {
            return validDmlQueue.take();
        }
        return null;
    }

    /**
     * 获取同步记录
     * 
     * @Title takeSyncRecord
     * @author 吕凯
     * @date 2021年12月16日 下午12:01:22
     * @return
     * @throws InterruptedException
     *             QueueEntry<RecordModel>
     */
    public QueueEntry<RecordModel> takeSyncRecord() throws InterruptedException {
        if (syncQueue != null) {
            return syncQueue.take();

        }
        return null;
    }

}
